/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.spi.discovery.tcp;

import java.net.InetSocketAddress;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_METRICS_QNT_WARN;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_DISCOVERY_METRICS_QNT_WARN;

/**
 *
 */
abstract class TcpDiscoveryImpl {
    /** Response OK. */
    protected static final int RES_OK = 1;

    /** Response CONTINUE JOIN. */
    protected static final int RES_CONTINUE_JOIN = 100;

    /** Response WAIT. */
    protected static final int RES_WAIT = 200;

    /** Response join impossible. */
    protected static final int RES_JOIN_IMPOSSIBLE = 255;

    /** How often the warning message should occur in logs to prevent log spam. */
    public static final long LOG_WARN_MSG_TIMEOUT = 60 * 60 * 1000L;

    /** Debug log date formatter. */
    private static final DateTimeFormatter DEBUG_FORMATTER =
        DateTimeFormatter.ofPattern("[HH:mm:ss,SSS]").withZone(ZoneId.systemDefault());

    /** */
    protected final TcpDiscoverySpi spi;

    /** */
    protected final IgniteLogger log;

    /** */
    protected volatile TcpDiscoveryNode locNode;

    /** Debug mode. */
    protected boolean debugMode;

    /** Debug messages history. */
    private int debugMsgHist = 512;

    /** Received messages. */
    protected ConcurrentLinkedDeque<String> debugLogQ;

    /** Logging a warning message when metrics quantity exceeded a specified number. */
    protected int METRICS_QNT_WARN = getInteger(IGNITE_DISCOVERY_METRICS_QNT_WARN, DFLT_DISCOVERY_METRICS_QNT_WARN);

    /** */
    protected long endTimeMetricsSizeProcessWait = System.currentTimeMillis();

    /** */
    protected final ServerImpl.DebugLogger debugLog = new DebugLogger() {
        /** {@inheritDoc} */
        @Override public boolean isDebugEnabled() {
            return log.isDebugEnabled();
        }

        /** {@inheritDoc} */
        @Override public void debug(String msg) {
            log.debug(msg);
        }
    };

    /** */
    protected final ServerImpl.DebugLogger traceLog = new DebugLogger() {
        /** {@inheritDoc} */
        @Override public boolean isDebugEnabled() {
            return log.isTraceEnabled();
        }

        /** {@inheritDoc} */
        @Override public void debug(String msg) {
            log.trace(msg);
        }
    };

    /** Tracing. */
    protected Tracing tracing;

    /**
     * Upcasts collection type.
     *
     * @param c Initial collection.
     * @return Resulting collection.
     */
    protected static <T extends R, R> Collection<R> upcast(Collection<T> c) {
        A.notNull(c, "c");

        return (Collection<R>)c;
    }

    /**
     * @param spi Adapter.
     */
    TcpDiscoveryImpl(TcpDiscoverySpi spi) {
        this.spi = spi;

        log = spi.log;

        if (spi.ignite() instanceof IgniteEx)
            tracing = ((IgniteEx)spi.ignite()).context().tracing();
        else
            tracing = new NoopTracing();
    }

    /**
     * This method is intended for troubleshooting purposes only.
     *
     * @param debugMode {code True} to start SPI in debug mode.
     */
    public void setDebugMode(boolean debugMode) {
        this.debugMode = debugMode;
    }

    /**
     * This method is intended for troubleshooting purposes only.
     *
     * @param debugMsgHist Message history log size.
     */
    public void setDebugMessageHistory(int debugMsgHist) {
        this.debugMsgHist = debugMsgHist;
    }

    /**
     * @param discoMsg Discovery message.
     * @param msg Message.
     */
    protected void debugLog(@Nullable TcpDiscoveryAbstractMessage discoMsg, String msg) {
        assert debugMode;

        String msg0 = DEBUG_FORMATTER.format(Instant.now()) +
            '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
            "-" + locNode.internalOrder() + "] " +
            msg;

        debugLogQ.add(msg0);

        int delta = debugLogQ.size() - debugMsgHist;

        for (int i = 0; i < delta && debugLogQ.size() > debugMsgHist; i++)
            debugLogQ.poll();
    }

    /**
     * @return Local node ID.
     */
    public UUID getLocalNodeId() {
        return spi.locNode.id();
    }

    /**
     * @return Configured node ID (actual node ID can be different if client reconnects).
     */
    public UUID getConfiguredNodeId() {
        return spi.cfgNodeId;
    }

    /**
     * @param msg Error message.
     * @param e Exception.
     */
    protected void onException(String msg, Exception e) {
        spi.getExceptionRegistry().onException(msg, e);
    }

    /**
     * Called when a local node either received from or sent to a remote node a message.
     */
    protected void onMessageExchanged() {
        // No-op
    }

    /**
     * @param log Logger.
     */
    public abstract void dumpDebugInfo(IgniteLogger log);

    /**
     * @return SPI state string.
     */
    public abstract String getSpiState();

    /**
     * @return Message worker queue current size.
     */
    public abstract int getMessageWorkerQueueSize();

    /**
     * @return Coordinator ID.
     */
    public abstract UUID getCoordinator();

    /**
     * @return Collection of remote nodes.
     */
    public abstract Collection<ClusterNode> getRemoteNodes();

    /**
     * @param nodeId Node id.
     * @return Node with given ID or {@code null} if node is not found.
     */
    @Nullable public abstract ClusterNode getNode(UUID nodeId);

    /**
     * @param nodeId Node id.
     * @return {@code true} if node alive, {@code false} otherwise.
     */
    public abstract boolean pingNode(UUID nodeId);

    /**
     * Tells discovery SPI to disconnect from topology.
     *
     * @throws IgniteSpiException If failed.
     */
    public abstract void disconnect() throws IgniteSpiException;

    /**
     * @param msg Message.
     * @throws IgniteException If failed.
     */
    public abstract void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException;

    /**
     * @param nodeId Node id.
     * @param warning Warning message to be shown on all nodes.
     */
    public abstract void failNode(UUID nodeId, @Nullable String warning);

    /**
     * Dumps ring structure to logger.
     *
     * @param log Logger.
     */
    public abstract void dumpRingStructure(IgniteLogger log);

    /**
     * Get current topology version.
     *
     * @return Current topology version.
     */
    public abstract long getCurrentTopologyVersion();

    /**
     * @param igniteInstanceName Ignite instance name.
     * @throws IgniteSpiException If failed.
     */
    public abstract void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException;

    /**
     * Will start TCP server if applicable and not started yet.
     *
     * @return Port this instance bound to.
     * @throws IgniteSpiException If failed.
     */
    public int boundPort() throws IgniteSpiException {
        return 0;
    }

    /**
     * @return connection check interval.
     */
    public long connectionCheckInterval() {
        return 0;
    }

    /**
     * @throws IgniteSpiException If failed.
     */
    public abstract void spiStop() throws IgniteSpiException;

    /**
     * @param spiCtx Spi context.
     * @throws IgniteSpiException If failed.
     */
    public abstract void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException;

    /**
     * @param t Thread.
     * @return Status as string.
     */
    protected static String threadStatus(Thread t) {
        if (t == null)
            return "N/A";

        return t.isAlive() ? "alive" : "dead";
    }

    /**
     * Leave cluster and try to join again.
     *
     * @throws IgniteSpiException If failed.
     */
    public abstract void reconnect() throws IgniteSpiException;

    /**
     * <strong>FOR TEST ONLY!!!</strong>
     * <p>
     * Simulates this node failure by stopping service threads. So, node will become
     * unresponsive.
     * <p>
     * This method is intended for test purposes only.
     */
    abstract void simulateNodeFailure();

    /**
     * FOR TEST PURPOSE ONLY!
     */
    public abstract void brakeConnection();

    /**
     * @param maxHops Maximum hops for {@link TcpDiscoveryRingLatencyCheckMessage}.
     */
    public abstract void checkRingLatency(int maxHops);

    /**
     * <strong>FOR TEST ONLY!!!</strong>
     *
     * @return Worker threads.
     */
    protected abstract Collection<IgniteSpiThread> threads();

    /**
     * @param nodeId Node ID.
     * @param metrics Metrics.
     * @param cacheMetrics Cache metrics.
     * @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
     */
    public abstract void updateMetrics(UUID nodeId,
        ClusterMetrics metrics,
        Map<Integer, CacheMetrics> cacheMetrics,
        long tsNanos);

    /**
     * @param ackTimeout Acknowledgement timeout.
     * @return {@code True} if acknowledgement timeout is less or equal to
     * maximum acknowledgement timeout, {@code false} otherwise.
     */
    protected boolean checkAckTimeout(long ackTimeout) {
        if (ackTimeout > spi.getMaxAckTimeout()) {
            LT.warn(log, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
                "(consider increasing 'maxAckTimeout' configuration property) " +
                "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.getMaxAckTimeout() + ']');

            return false;
        }

        return true;
    }

    /** */
    public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) {
        for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
            UUID nodeId = e.getKey();

            TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();

            Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
                msg.cacheMetrics().get(nodeId) : Collections.emptyMap();

            if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis()
                && cacheMetrics.size() >= METRICS_QNT_WARN) {
                log.warning("The Discovery message has metrics for " + cacheMetrics.size() + " caches.\n" +
                    "To prevent Discovery blocking use -DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option.");

                endTimeMetricsSizeProcessWait = U.currentTimeMillis() + LOG_WARN_MSG_TIMEOUT;
            }

            updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);

            for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
                updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
        }
    }

    /**
     * @param addrs Addresses.
     */
    protected static List<String> toOrderedList(Collection<InetSocketAddress> addrs) {
        List<String> res = new ArrayList<>(addrs.size());

        for (InetSocketAddress addr : addrs)
            res.add(addr.toString());

        Collections.sort(res);

        return res;
    }

    /**
     * @param msg Message.
     * @return Message logger.
     */
    protected final DebugLogger messageLogger(TcpDiscoveryAbstractMessage msg) {
        return msg.traceLogLevel() ? traceLog : debugLog;
    }

    /**
     *
     */
    interface DebugLogger {
        /**
         * @return {@code True} if debug logging is enabled.
         */
        boolean isDebugEnabled();

        /**
         * @param msg Message to log.
         */
        void debug(String msg);
    }
}
